package org.databandtech.mockmq;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.databandtech.mockmq.entity.HotWord;

import com.google.gson.Gson;

public class HotWordKafkaLog {
	
	final static String HOST="192.168.13.52:9092";//"192.168.10.60:9092"
	final static String TOPIC="HOTWORDS";
	final static int COUNT=10;    //发送的数据条数
	final static int PARTITION=0; //分区
	
	final static String[] HOTSEARCH = { "美突击检查中国赴美人员党员身份","拜登呼吁必须停止把对手当敌人","澳总理要求中方道歉 华春莹反击","编剧炮轰“丁真现象”:过度吹嘘","男生潜入女寝室杀害女友后自杀","伊朗核科学家遭远程自动机枪射击","江苏响水致78死爆炸案一审宣判","女副部被“双开”:接受高档美容"};
	

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("bootstrap.servers", HOST);
		// 0:producer不会等待broker发送ack
        // 1:当leader接收到消息后发送ack
        // -1:当所有的follower都同步消息成功后发送ack
		properties.put("acks", "-1");
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		@SuppressWarnings("resource")
		org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
		// -- 同步发送消息
		for (int i = 1; i <= COUNT; i++) {
			RecordMetadata metadata = null;
		    //参数1：topic名, 参数2：消息文本； ProducerRecord多个重载的构造方法
			HotWord hw = new HotWord();
			hw.setKeyword(HOTSEARCH[Mock.getNum(0, HOTSEARCH.length-1)]);
			LocalDateTime localDateTime = LocalDateTime.now();
			hw.setTs(localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
			hw.setUserid(Mock.getCnName());			

			//指定分区发送
			//ProducerRecord<String, String> pr = new ProducerRecord<String, String>(TOPIC,PARTITION,"key"+i, msg +"--"+i);
			//发送到默认分区
			String jsonStr = new Gson().toJson(hw);
			ProducerRecord<String, String> pr = new ProducerRecord<String, String>(TOPIC, jsonStr);
			try {
				metadata = kafkaProducer.send(pr).get();
				System.out.println("TopicName : " + metadata.topic() + " Partiton : " + metadata
	                    .partition() + " Offset : " + metadata.offset()+"--"+jsonStr+i);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (ExecutionException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}		    
		}
	}

}
